-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18243][SQL] Port Hive writing to use FileFormat interface #16517
Conversation
@@ -99,7 +99,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |||
} | |||
|
|||
private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { | |||
// The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet | |||
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin @jiangxb1987 do you know why we have a random UUID in the middle of the file name? We pass it into HadoopMapReduceCommitProtocol
as jobId
. Are we trying to avoid conflicts when multiple jobs writing to a same path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we fixed a potential problem, the previous hive table writer doesn't handle this case: https://github.com/apache/spark/pull/16517/files#diff-92b05808926122b334c2fdd2fd1e4221L103
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After more reading, the exist hive table writers do not have such an issue. It is based on a unique ID TaskAttemptID
, which is generated by the function call of FileOutputFormat.getTaskOutputPath
@@ -128,34 +128,32 @@ object FileFormatWriter extends Logging { | |||
.getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile) | |||
) | |||
|
|||
SQLExecution.withNewExecutionId(sparkSession, queryExecution) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll submit a different PR for this bug fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) | ||
if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) => | ||
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to use FileFormatWriter
, we need to make InsertIntoHiveTable
a LogicalPlan
like InsertIntoHadoopFsRelation
, which means we need to convert InsertIntoTable
to InsertIntoHiveTable
during analysis.
|
||
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { | ||
import HiveTableCommitProtocol.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER | ||
// This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @liancheng @yhuai do you have more context about it? It will be greate if we can remove this hack and them we can remove this class.
Test build #71083 has finished for PR 16517 at commit
|
Test build #71085 has finished for PR 16517 at commit
|
Maybe a better title is "Port Hive writing to use FileFormat interface"? |
0039f2f
to
36c9269
Compare
} | ||
|
||
override def commitJob(): Unit = { | ||
// This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hack is no longer needed, as we don't support hadoop 1.x now.
Test build #71111 has finished for PR 16517 at commit
|
36c9269
to
e208868
Compare
Test build #71122 has finished for PR 16517 at commit
|
I manually tested hive table with storage handler(a jdbc storage handler: https://github.com/qubole/Hive-JDBC-Storage-Handler), and it still works after this PR. |
e208868
to
49af843
Compare
Test build #71324 has finished for PR 16517 at commit
|
|
||
@transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] | ||
@transient private val externalCatalog = sqlContext.sharedState.externalCatalog | ||
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Let me see whether we can add such a test case to hit the bug without it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't. We only replace InsertIntoTable
with InsertIntoHiveTable
at planner.
@@ -99,7 +99,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |||
} | |||
|
|||
private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { | |||
// The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet | |||
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ext string is always starting from c
. Below is the example I got from a test case.
part-00000-fd8f3fdd-653a-4ea0-ab6d-5c8ad610b184-c000.snappy.parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I should update this string, c000
is files-count, which is added recently.
mode == SaveMode.Ignore) | ||
} | ||
|
||
private def hasBeenPreprocessed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also add a code comment for this func?
/**
* Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule
* [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to
* be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and
* fix the schema mismatch by adding Cast.
*/
if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { | ||
throw new AnalysisException( | ||
"CTAS for hive serde tables does not support append or overwrite semantics.") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above codes need to merge from the latest master build.
Left a few comments. I am not 100% sure whether |
49af843
to
e130e1c
Compare
Test build #71452 has finished for PR 16517 at commit
|
retest this please |
Test build #71461 has finished for PR 16517 at commit
|
*/ | ||
def hiveResultString(): Seq[String] = executedPlan match { | ||
case ExecutedCommandExec(desc: DescribeTableCommand) => | ||
SQLExecution.withNewExecutionId(sparkSession, this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain the reason that SQLExecution.withNewExecutionId(sparkSession, this)
is not needed?
/** | ||
* Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule | ||
* [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to | ||
* be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule is in the same batch with PreprocessTableInsertion, right? If so, we cannot guarantee that PreprocessTableInsertion will always fire first for a command before InsertIntoTable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, you mean that we use this function to determine if PreprocessTableInsertion has fired?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this function actually be part of the resolved method of InsertIntoTable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this version is good for now.
override def inferSchema( | ||
sparkSession: SparkSession, | ||
options: Map[String, String], | ||
files: Seq[FileStatus]): Option[StructType] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to return None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, because we are not going to use it in read path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. Let's throw an exception at here.
sparkSession: SparkSession, | ||
job: Job, | ||
options: Map[String, String], | ||
dataSchema: StructType): OutputWriterFactory = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to comment the original source of code in this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The preparation logic was dispersive before, I collected all of them and put them here.
context: TaskAttemptContext): OutputWriter = { | ||
new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, dataSchema) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just create a class instead of using an anonymous class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed other FileFormat
implementations here.
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) | ||
|
||
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => | ||
// Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code block below is mostly moved from https://github.com/apache/spark/pull/16517/files#diff-c4ed9859978dd6ac271b6a40ee945e4bL112
dataSchema: StructType): OutputWriterFactory = { | ||
val conf = job.getConfiguration | ||
val tableDesc = fileSinkConf.getTableInfo | ||
conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val tableDesc = fileSinkConf.getTableInfo | ||
conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName) | ||
|
||
// Add table properties from storage handler to hadoopConf, so any custom storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] | ||
|
||
override def getFileExtension(context: TaskAttemptContext): String = { | ||
Utilities.getFileExtension(jobConf.value, fileSinkConfSer.getCompressed, outputFormat) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
private def tableDesc = fileSinkConf.getTableInfo | ||
|
||
private val serializer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serializer | ||
} | ||
|
||
private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new Path(path), | ||
Reporter.NULL) | ||
|
||
private val standardOI = ObjectInspectorUtils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code block below(until def write
) is moved from https://github.com/apache/spark/pull/16517/files#diff-92b05808926122b334c2fdd2fd1e4221L167
Test build #71548 has finished for PR 16517 at commit
|
// Add table properties from storage handler to hadoopConf, so any custom storage | ||
// handler settings can be set to hadoopConf | ||
HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false) | ||
Utilities.copyTableJobPropertiesToConf(tableDesc, conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will tableDesc be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the tableDesc
is created at https://github.com/apache/spark/pull/16517/files#diff-d579db9a8f27e0bbef37720ab14ec3f6R223 . So it will never be null, and the previous null check is unnecessary.
// users that they may loss data if they are using a direct output committer. | ||
val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) | ||
val outputCommitterClass = jobConf.get("mapred.output.committer.class", "") | ||
if (speculationEnabled && outputCommitterClass.contains("Direct")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the direct committer has been removed already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this change is unnecessary and users may still use direct output committer (they can still find the code on Internet). Let's keep the warning.
Looks good to me. @gatorsmile can you explain your concerns? I am wondering what kind of cases that you think HiveFileFormat may not be able to handle. |
No more questions after the latest changes. LGTM pending Jenkins |
Test build #71569 has finished for PR 16517 at commit
|
Thanks! Merged to master. |
## What changes were proposed in this pull request? Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`. Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`. This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16517 from cloud-fan/insert-hive.
## What changes were proposed in this pull request? Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`. Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`. This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16517 from cloud-fan/insert-hive.
What changes were proposed in this pull request?
Inserting data into Hive tables has its own implementation that is distinct from data sources:
InsertIntoHiveTable
,SparkHiveWriterContainer
andSparkHiveDynamicPartitionWriterContainer
.Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep
InsertIntoHiveTable
to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to callloadTable
/loadPartition
at the end and removeInsertIntoHiveTable
.This PR removes
SparkHiveWriterContainer
andSparkHiveDynamicPartitionWriterContainer
, and create aHiveFileFormat
to implement the write logic. In the future, we should also implement the read logic inHiveFileFormat
.How was this patch tested?
existing tests